---
layout: post
date: 2020-01-30
title: "Using Messages In Elixir To Avoid Polling"
description: "Using message to avoid polling while still supporting buffering"
tags: [design]
---

<p>In the last post we talked about the <a href="/Consistent-Database-and-Message-Queue-Writes/">Transactional Outbox pattern</a> which can help keep DB and queue writes consistent by using the database as a staging area for your queue.</p>

<p>That post left it up to you to figure out how you'd trigger the code that gets messages out of your outbox table and into your queue. In this post we'll look at a specific implementation based on Elixir messages.</p>

<p>Generalizing from the last post, we essentially have some work that needs to be done asynchronously. In our specific case, the work is queued in a database, but the solutions we'll look at here isn't tied to that. The simplest solution is to poll:</p>

{% highlight elixir %}
def run() do
  :timer.sleep(1000)
  rows = DB.query("delete from outbox ... returning *")
  process_rows(rows)
  run()
end{% endhighlight %}

<p>The problem with polling is that you're wasting resources whenever there's no work to do.</p>

<p>An alternative approach is to spawn a thread/goroutine/process as the work arrives. Or, in the case of Go and Elixir we could use channels or messages respectively to trigger the action from a dedicatd goroutine/process. For example we can modify our transaction code from the last post to include the signal:</p>

{% highlight elixir %}DB.transaction(fn conn ->
  DB.query!(conn, "insert into users ....")

  DB.query!(conn, "
    insert into outbox (route, payload)
    values ($1, $2)
  ", ["user.create", payload])

  Outbox.process() # <--- added
end)

# Our new process
defmodule Outbox do
  use GenServer

  def process() do
    GenServer.cast(__MODULE__, :process)
  end

  def handle_cast(:process, state) do
    run()
    {:noreply, state}
  end

  defp run() do
    rows = DB.query("delete from outbox ... returning *")
    process_rows(rows)
  end
end
{% endhighlight %}

<p>One problem here is that if the processing fails (or the entire app crashes or is redeployed without a clean shutdown) we have to wait until the next time <code>Outbox.process/1</code> is called to do our work.</p>

<p>The Elixir "way" to solve this is to "let it crash". If we make it so the only way for our <code>handle_cast/2</code> to fail is to crash, then our Outbox will be restarted by its supervisor. During startup, Outbox can check for any pending work:</p>

{% highlight elixir %}defmodule Outbox do
  use GenServer

  def init() do
    {:ok, nil, {:continue, :recover}}
  end

  def handle_continue(:recover, state) do
    run()
    {:noreply, state}
  end

  # same as before
end{% endhighlight %}

<p>If you're willing to live with a bit of latency, we can improve this by adding some batching. The simplest solution would be to make a small change the original from:</p>

{% highlight elixir %}def process() do
  GenServer.cast(__MODULE__, :process)
end

def handle_cast(:process, state) do
  run()
  {:noreply, state}
end{% endhighlight %}

<p>to:</p>

{% highlight elixir %}def process() do
  pid = Process.whereis(__MODULE__)
  Process.send_after(pid, :process, 2_000)
end

def handle_info(:process, state) do
  run()
  {:noreply, state}
end{% endhighlight %}

<p>We're delaying the signal by 2 seconds. However, if <code>process/0</code> is called 100 times during those 2 seconds, we still call <code>run/0</code> 100 times. This isn't optimal. Consider this tweak:</p>

{% highlight elixir %}def process() do
  GenServer.cast(__MODULE__, :process)
end

# our state is nil, there's no pending signal
def handle_cast(:process, nil) do
  ref = Process.send_after(self(), :process, 2_000)
  {:noreply, ref}
end

# our state is not nil, there's already a pending signal, don't do anything
def handle_cast(:process, ref) do
  {:noreply, ref}
end

# after we're done processing, we set the state back to nil to allow
# new signals
def handle_info(:process, _ref) do
  run()
  {:noreply, nil}
end{% endhighlight %}

<p>We use our process' state to track whether we have a pending signal or not. Because a process synchronously executes 1 message at a time, we're guaranteed that our <code>handle_info/2</code> and <code>handle_cast/2</code> will never execute concurrently. This means that we'll never miss a signal and won't needlessly execute <code>run/0</code> (depending on your pulling your batch work, <code>run/0</code> might be called needlessly 1 extra time, but that's a lot better than N extra times.)</p>

<p>The explicit interface for handling messages (handle_XXX) and the process state, combined with powerful pattern matching, makes the flow of our code based on the state equally explicit. The code is easier to reason about and easier to test.</p>
